package com.dduggs.rocketmq;

import com.alibaba.rocketmq.client.consumer.*;
import com.alibaba.rocketmq.client.consumer.listener.*;
import com.alibaba.rocketmq.client.exception.*;
import com.alibaba.rocketmq.common.message.*;
import com.alibaba.rocketmq.common.protocol.heartbeat.*;
import com.dduggs.utils.HessianUtils;
import com.dduggs.utils.PropertyFileUtil;
import lombok.Data;
import org.apache.commons.lang3.*;
import org.slf4j.*;
import org.springframework.transaction.*;

import java.io.*;
import java.net.*;
import java.util.*;

/**
 * 消费者基础实现
 *
 * @author Peter Zhang
 */
@Data
public abstract class BaseConsumer implements MessageListenerConcurrently {

    protected final Logger logger = LoggerFactory.getLogger(BaseConsumer.class);

    protected DefaultMQPushConsumer consumer;
    protected String nameServer;
    protected int minConsumeThread = 2;
    protected int maxConsumeThread = 5;
    protected String group;
    protected String subExpression;
    protected Topic topic;
    protected Class<? extends Topic> topicType;
    // 定时消息相关
    // 线上环境：messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 40m 50m 1h 2h 6h
    // 开发环境：messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 40m 50m 1h 2h 6h 12h 1d
    private final static int[] DELAY_LEVELS = new int[]{3, 5, 9, 14, 15, 16, 17, 18, 19, 20, 21};
    protected int maxRetryCount = 10;

    public void init() throws MQClientException {

        nameServer = PropertyFileUtil.get("rocketmq.namesrv.domain");

        if (StringUtils.isBlank(nameServer)) {

            logger.warn("【MQ init】property rocketmq.namesrv.domain not found");

            return;
        }

        if ("localTest".equals(nameServer)) {

            logger.warn("【MQ init】localTest");

            return;
        }

        if (StringUtils.isBlank(System.getProperty("rocketmq.namesrv.domain"))) {
            System.setProperty("rocketmq.namesrv.domain", nameServer);
        }

        topicType = getTopic();
        topic = RocketMqUtils.getTopic(topicType);

        if (StringUtils.isBlank(group)) {
            group = "S_" + topic.getTopic() + "_" + topic.getTags();
        }

        consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr(nameServer);
        consumer.setMessageModel(getMessageModel());
        consumer.setConsumeThreadMin(minConsumeThread);
        consumer.setConsumeThreadMax(maxConsumeThread);
        consumer.setVipChannelEnabled(false);

        //可以不设置 设置后可以起多个 消费端
        try {

            consumer.setInstanceName("DEFAULT_CONSUMER-" + InetAddress.getLocalHost().getHostName());

        } catch (UnknownHostException e) {

            logger.error("MQ getHostName error", e);
        }

        //设置订阅的topic 设置订阅过滤表达式
        if (StringUtils.isBlank(subExpression)) {

            subExpression = topic.getTags();
            consumer.subscribe(topic.getTopic(), subExpression);

        } else {

            consumer.subscribe(topic.getTopic(), subExpression);

        }
        try {

            consumer.registerMessageListener(this);
            consumer.start();

        } catch (MQClientException e) {

            logger.error("consumer start error!topic={},subExpression={},group={}",
                    topic.getTopic(), subExpression, group, e);

        }

        logger.info("consumer start! topic={},subExpression={},group={}", topic.getTopic(), subExpression, group);
    }

    protected MessageModel getMessageModel() {

        return MessageModel.CLUSTERING;
    }

    public void destroy() {

        if (consumer != null) {

            consumer.shutdown();

            logger.info("consumer shutdown! topic={},subExpression={},group={}", topic.getTopic(), subExpression, group);

        }
    }

    public abstract Class<? extends Topic> getTopic();

    /**
     * 子类实现的，用于实际记录错误的代码
     *
     * @param msgObj 消息对象列表
     * @return 消费状态
     */
    public abstract void doLogErrorConsumeMessage(MsgObj msgObj);

    /**
     * 子类实现的，用于实际消费的代码
     *
     * @param msgObj 消息对象列表
     * @return 消费状态
     */
    public abstract ConsumeConcurrentlyStatus doConsumeMessage(MsgObj msgObj);

    /**
     * 基类实现消息监听接口，加上打印mq监控日志的方法
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        long startTime = System.currentTimeMillis();

        if (msgs == null || msgs.size() < 1) {

            logger.error("receive empty msg!");

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        MessageExt firstMessage = msgs.get(0);

        logger.info("receive_message:topic={},tags={},msgId={}",
                firstMessage.getTopic(), firstMessage.getTags(), firstMessage.getMsgId());

        List<Serializable> msgList = new ArrayList<>();

        for (MessageExt message : msgs) {

            msgList.add(decodeMsg(message));
        }

        final int reConsumeTimes = msgs.get(0).getReconsumeTimes();
        MsgObj msgObj = new MsgObj();
        msgObj.setReConsumeTimes(reConsumeTimes);
        msgObj.setMsgList(msgList);
        msgObj.setContext(context);
        context.setDelayLevelWhenNextConsume(getDelayLevelWhenNextConsume(reConsumeTimes));
        msgObj.setMsgId(msgs.get(0).getMsgId());
        ConsumeConcurrentlyStatus status = doConsumeMessage(msgObj);

        logger.info("{}-{},consumeStatus={},msgId={},cost:{}",
                firstMessage.getTopic(), firstMessage.getTags(),
                status, msgs.get(0).getMsgId(), System.currentTimeMillis() - startTime);

        return status;

    }

    /**
     * 根据重试次数设置重新消费延迟时间
     * 1s 10s 30s 2m 10m 30m 1h 2h 12h 1d
     *
     * @param reConsumeTimes 重试的次数
     * @return level级别
     */
    public int getDelayLevelWhenNextConsume(int reConsumeTimes) {

        if (reConsumeTimes >= DELAY_LEVELS.length) {

            return DELAY_LEVELS[DELAY_LEVELS.length - 1];
        }

        return DELAY_LEVELS[reConsumeTimes];
    }

    private Serializable decodeMsg(MessageExt msg) {

        if (msg == null) {

            return null;
        }

        try {

            return HessianUtils.decode(msg.getBody());

        } catch (IOException e) {

            logger.error("反序列化出错!" + e.getMessage(), e);

            return null;
        }
    }

    protected ConsumeConcurrentlyStatus exceptionConsumeConcurrentlyStatus(TransactionStatus status, Throwable e, MsgObj msgObj, int maxRetryCount) {

        logger.error("mq consume failed", e);

        status.setRollbackOnly();

        if (msgObj.getReConsumeTimes() >= maxRetryCount) {

            logger.error("retryCount: {}, msgs: {}, context: {}", maxRetryCount, msgObj, msgObj.getContext());

            msgObj.setErrorMsg(e.getMessage());
            doLogErrorConsumeMessage(msgObj);

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        } else {

            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}